package com.lagou.bak;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class SinkToMySql extends RichSinkFunction<Students> {
    PreparedStatement preparedStatement = null;
    Connection connection = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        String url = "jdbc:mysql://localhost:3306/bigdata?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC";
        String user = "root";
        String password = "lucas";
        connection = DriverManager.getConnection(url, user, password);
        String sql = "insert into student (name,age) values (?,?)";
        preparedStatement = connection.prepareStatement(sql);

    }

    @Override
    public void invoke(Students stu, Context context) throws Exception {
        preparedStatement.setString(1,stu.getName());
        preparedStatement.setInt(2,stu.getAge());
        preparedStatement.executeUpdate();
    }

    @Override
    public void close() throws Exception {
        if(connection != null) {
            connection.close();
        }
        if (preparedStatement != null) {
            preparedStatement.close();
        }
    }
}
